 1.指标统计之快递单主题
   
   1).lg_ods层建表
   建表语句
   --客户地址表
DROP TABLE IF EXISTS `lg_ods.lg_address`;
CREATE TABLE `lg_ods.lg_address` (
`id` string,
`name` string,
`tel` string,
`mobile` string,
`detail_addr` string,
`area_id` string,
`gis_addr` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '客户地址表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
   
   --客户表
DROP TABLE IF EXISTS `lg_ods.lg_customer`;
CREATE TABLE `lg_ods.lg_customer` (
`id` string,
`name` string,
`tel` string,
`mobile` string,
`email` string,
`type` string,
`is_own_reg` string,
`reg_dt` string,
`reg_channel_id` string,
`state` string,
`cdt` string,
`udt` string,
`last_login_dt` string,
`remark` string)
COMMENT '客户表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_areas`;
CREATE TABLE `lg_ods.lg_areas` (
`id` string,
`name` string,
`pid` string,
`sname` string,
`level` string,
`citycode` string,
`yzcode` string,
`mername` string,
`lng` string,
`lat` string,
`pinyin` string
) COMMENT '区域表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';
    
   DROP TABLE IF EXISTS `lg_ods.lg_courier`;
CREATE TABLE `lg_ods.lg_courier` (
`id` string,
`job_num` string,
`name` string,
`birathday` string,
`tel` string,
`pda_num` string,
`car_id` string,
`postal_standard_id` string,
`work_time_id` string,
`dot_id` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '快递员表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_dot`;
CREATE TABLE `lg_ods.lg_dot` (
`id` string,
`dot_number` string,
`dot_name` string,
`dot_addr` string,
`dot_gis_addr` string,
`dot_tel` string,
`company_id` string,
`manage_area_id` string,
`manage_area_gis` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '网点表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_company_dot_map`;
CREATE TABLE `lg_ods.lg_company_dot_map` (
`id` string,
`company_id` string,
`dot_id` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '公司网点关联表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_company`;
CREATE TABLE `lg_ods.lg_company` (
`id` string,
`company_name` string,
`city_id` string,
`company_number` string,
`company_addr` string,
`company_addr_gis` string,
`company_tel` string,
`is_sub_company` string,
`state` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '公司表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_customer_address_map`;
CREATE TABLE `lg_ods.lg_customer_address_map` (
`id` string,
`consumer_id` string,
`address_id` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '客户地址关联表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_codes`;
CREATE TABLE `lg_ods.lg_codes` (
`id` string,
`name` string,
`type` string,
`code` string,
`code_desc` string,
`code_type` string,
`state` string,
`cdt` string,
`udt` string
) COMMENT '字典表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_express_bill`;
CREATE TABLE `lg_ods.lg_express_bill` (
`id` string,
`express_number` string,
`cid` string,
`eid` string,
`order_channel_id` string,
`order_dt` string,
`order_terminal_type` string,
`order_terminal_os_type` string,
`reserve_dt` string,
`is_collect_package_timeout` string,
`timeout_dt` string,
`type` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '快递单据表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   DROP TABLE IF EXISTS `lg_ods.lg_pkg`;
CREATE TABLE `lg_ods.lg_pkg` (
`id` string,
`pw_bill` string,
`pw_dot_id` string,
`warehouse_id` string,
`cdt` string,
`udt` string,
`remark` string
) COMMENT '包裹表'
PARTITIONED BY (`dt` string) row format delimited fields terminated by ',';

   2).数据采集
   业务数据保存在MySQL中，每日凌晨导入上一天的表数据。
   事实表
       全量导入
	cd lg_logstic/data_collect/all_import/import_express_bill.sh
#!/bin/bash
source /etc/profile
##如果第一个参数不为空，则作为工作日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期，减一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定义sqoop命令位置，Hive命令位置，在hadoop2
#sqoop=/opt/cloudera/parcels/CDH/bin/sqoop
#Hive=/opt/cloudera/parcels/CDH/bin/hive
#定义工作日期
#编写导入数据通用方法 接收两个参数：第一个：表名，第二个：查询语句
import_data(){
sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logistics \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}

# 全量导入快递单据表数据
import_lg_express_bill(){
import_data lg_express_bill "select
*   
from lg_express_bill
where 1=1"
}

# 全量导入包裹表方法
import_lg_pkg(){
import_data lg_pkg "select
*   
from lg_pkg
where 1=1"
}

#调用全量导入数据方法
import_lg_express_bill
import_lg_pkg

#注意sqoop导入数据的方式，对于Hive分区表来说需要执行添加分区操作，数据才能被识别到
hive -e "alter table lg_ods.lg_express_bill add partition(dt='$do_date');
alter table lg_ods.lg_pkg add partition(dt='$do_date');
"
   
   增量导入
#!/bin/bash
source /etc/profile
##如果第一个参数不为空，则作为工作日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期，减一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定义sqoop命令位置，Hive命令位置，在hadoop2
#sqoop=/opt/cloudera/parcels/CDH/bin/sqoop
#Hive=/opt/cloudera/parcels/CDH/bin/hive
#定义工作日期

#编写导入数据通用方法 接收两个参数：第一个：表名，第二个：查询语句
import_data(){
sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logistics \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}

# 增量导入快递单据表数据
import_lg_express_bill(){
import_data lg_express_bill "select
*   
from lg_express_bill
WHERE DATE_FORMAT(cdt, '%Y%m%d') = '${do_date}'
"
}

# 增量导入包裹表方法
import_lg_pkg(){
import_data lg_pkg "select
*   
from lg_pkg
WHERE DATE_FORMAT(cdt, '%Y%m%d') = '${do_date}'
"
}

#调用全量导入数据方法
import_lg_express_bill
import_lg_pkg

#注意sqoop导入数据的方式，对于Hive分区表来说需要执行添加分区操作，数据才能被识别到
$Hive -e "alter table lg_ods.lg_express_bill add partition(dt='$do_date');
alter table lg_ods.lg_pkg add partition(dt='$do_date');
"

   维度表
       全量导入
   cd lg_logstic/data_collect/all_import/import_express_dim.sh
#!/bin/bash
source /etc/profile
##如果第一个参数不为空，则作为工作日期使用
if [ -n "$1" ]
then
do_date=$1
else
##昨天日期，减一
do_date=`date -d "-1 day" +"%Y%m%d"`
fi
#定义sqoop命令位置，Hive命令位置，在hadoop2
#sqoop=/opt/cloudera/parcels/CDH/bin/sqoop
#Hive=/opt/cloudera/parcels/CDH/bin/hive
#定义工作日期

#编写导入数据通用方法 接收两个参数：第一个：表名，第二个：查询语句
import_data(){
sqoop import \
--connect jdbc:mysql://linux123:3306/lg_logistics \
--username root \
--password 12345678 \
--target-dir /user/hive/warehouse/lg_ods.db/$1/dt=$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by ',' \
--null-string '\\N' \
--null-non-string '\\N'
}

# 全量导入客户地址表
import_lg_address(){
import_data lg_address "select
*   
from lg_address
where 1=1"
}

# 全量导入仓库方法
import_lg_entrepots(){
import_data lg_entrepots "select
*   
from lg_entrepots
where 1=1"
}
  
# 全量导入区域表
import_lg_areas(){
import_data lg_areas "select
*   
from lg_areas
where 1=1"
}

# 全量导入快递员表
import_lg_courier(){
import_data lg_courier "select
*   
from lg_courier
where 1=1"
}

# 全量导入网点表
import_lg_dot(){
import_data lg_dot "select
*   
from lg_dot
where 1=1"
}

# 全量导入公司网点关联表
import_lg_company_dot_map(){
import_data lg_company_dot_map "select
*   
from lg_company_dot_map
where 1=1"
}

# 全量导入公司网点关联表
import_lg_company(){
import_data lg_company "select
*   
from lg_company
where 1=1"
}

# 全量导入客户表
import_lg_customer(){
import_data lg_customer "select
*   
from lg_customer
where 1=1"
}

# 全量导入客户地址关联表
import_lg_customer_address_map(){
import_data lg_customer_address_map "select
*   
from lg_customer_address_map
where 1=1"
}

# 全量导入字典表
import_lg_codes(){
import_data lg_codes "select
*   
from lg_codes
where 1=1"
}

#调用全量导入数据方法
import_lg_address
import_lg_entrepots
import_lg_areas
import_lg_courier
import_lg_dot
import_lg_company_dot_map
import_lg_company
import_lg_customer
import_lg_customer_address_map
import_lg_codes

#注意sqoop导入数据的方式，对于Hive分区表来说需要执行添加分区操作，数据才能被识别到
hive -e "alter table lg_ods.lg_address add partition(dt='$do_date');
alter table lg_ods.lg_areas add partition(dt='$do_date');
alter table lg_ods.lg_courier add partition(dt='$do_date');
alter table lg_ods.lg_dot add partition(dt='$do_date');
alter table lg_ods.lg_company_dot_map add partition(dt='$do_date');
alter table lg_ods.lg_company add partition(dt='$do_date');
alter table lg_ods.lg_customer add partition(dt='$do_date');
alter table lg_ods.lg_customer_address_map add partition(dt='$do_date');
alter table lg_ods.lg_codes add partition(dt='$do_date');
"

   3).ETL
   (1).指标明细
   指标列表     维度
   最大快递单数 各类客户最大快递单数
                各渠道最大快递单数
                各网点最大快递单数
                各终端最大快递单数
   最小快递单数 各类客户最小快递单数
                各渠道最小快递单数
                各网点最小快递单数
                各终端最小快递单数
   平均快递单数 各类客户平均快递单数
                各渠道平均快递单数
                各网点平均快递单数
                各终端平均快递单数
   (2).表关系示意图
   事实表
   表名            描述
   lg_express_bill 快递单据表
   lg_pkg          包裹表
   维度表
   表名                    描述
   lg_consumer             客户表
   lg_courier              快递员表
   lg_areas                区域表
   lg_dot                  网点表
   lg_company_dot_map      公司网点关联表
   lg_company              公司表
   lg_customer_address_map 客户地址关联表
   lg_address              客户地址表
   lg_codes                字典表
   (3).预处理
   创建lg_dwd层表
   1 、事实表
   事实表拉宽
   创建lg_dwd层表
create database lg_dwd;
use lg_dwd;
DROP TABLE IF EXISTS `lg_dwd.expression_lg_express_pkg`;
CREATE TABLE `lg_dwd.expression_lg_express_pkg` (
id string,
express_number string,
cid string,eid string,
order_channel_id string,
order_dt string,
order_terminal_type string,
order_terminal_os_type string,
reserve_dt string,
is_collect_package_timeout string,
timeout_dt string,
cdt string,
udt string,
remark string,
pw_bill string,
pw_dot_id string
) COMMENT '快递单据表-包裹关联表'
partitioned by (dt string) STORED AS PARQUET ;
   
   ETL-SQL
insert overwrite table lg_dwd.expression_lg_express_pkg partition(dt='2020-06-02')
select
ebill.id,
ebill.express_number,
ebill.cid,
ebill.eid,
ebill.order_channel_id,
ebill.order_dt,
ebill.order_terminal_type,
ebill.order_terminal_os_type,
ebill.reserve_dt,
ebill.is_collect_package_timeout,
ebill.timeout_dt,
ebill.cdt,
ebill.udt,
ebill.remark,
pkg.pw_bill,
pkg.pw_dot_id
from
(select * from lg_ods.lg_express_bill where dt='2020-06-02') 
ebill
left join (select pw_bill,pw_dot_id from lg_ods.lg_pkg where dt ='2020-06-02') pkg 
on ebill.express_number =pkg.pw_bill;
   
   2 、维度表
   DIM
   客户地址信息拉宽表
   创建表
drop table if exists lg_dim.express_customer_address;
create table lg_dim.express_customer_address(
id string,
cname string,
caddress string,
type string) COMMENT '快递单主题-客户地址信息'
partitioned by (dt string) STORED AS PARQUET ;

insert overwrite table lg_dim.express_customer_address partition(dt='2020-06-02')
select
customer.id as id,
customer.name as cname,
address.detail_addr as caddress,
customer.type as type
from
(select name,id,type from lg_ods.lg_customer where dt ='2020-06-02') customer
left join (select address_id,consumer_id from lg_ods.lg_customer_address_map
where dt='2020-06-02' ) 
address_map
on customer.id=address_map.consumer_id
left join (select id,detail_addr from lg_ods.lg_address where dt ='2020-06-02')
address
on address_map.address_id=address.id ;
   
   ETL-SQL
   网点公司信息拉宽表
   drop table if exists lg_dim.express_company_dot_addr;
create table lg_dim.express_company_dot_addr(
id string,
dot_name string,
company_name string)COMMENT '快递单主题-公司网点地址信息'
partitioned by (dt string) STORED AS PARQUET ;
   
   insert overwrite table lg_dim.express_company_dot_addr partition(dt='2020-06-02')
select
dot.id as id,
dot.dot_name as dot_name,
company.company_name as company_name
from (select id,dot_name from lg_ods.lg_dot where dt ='2020-06-02') dot
left join (select dot_id,company_id from lg_ods.lg_company_dot_map where dt='2020-06-02') companydot
on dot.id=companydot.dot_id
left join (select company_name,id from lg_ods.lg_company where dt ='2020-06-02')
company
on company.id=companydot.company_id ;
   4).指标统计
   计算的字段
   指标列表     维度
   快递单数     总快递单数
   最大快递单数 各类客户最大快递单数
                各渠道最大快递单数
                各网点最大快递单数
                各终端最大快递单数
   最小快递单数 各类客户最小快递单数
                各渠道最小快递单数
                各网点最小快递单数
                各终端最小快递单数
   平均快递单数 各类客户平均快递单数
                各渠道平均快递单数
                各网点平均快递单数
                各终端平均快递单数
   -- 创建lg_dws层数据库
drop table if exists lg_dws.express_base_agg;
create table lg_dws.express_base_agg(
express_count bigint,
customer_type string,
dot_name string,
channel_id string,
terminal_type string)COMMENT '快递单主题-初步汇总表'
partitioned by (dt string) STORED AS PARQUET ;

   insert overwrite table lg_dws.express_base_agg partition(dt='2020-06-02')
select
count(express_number) as express_count,
t2.type as customer_type,
t3.dot_name as dot_name,
t1.order_channel_id as channel_id,
t1.order_terminal_type as terminal_type
from
(select * from lg_dwd.expression_lg_express_pkg where dt ='2020-06-02') t1
left join
(select * from lg_dim.express_customer_address where dt ='2020-06-02') t2
on t1.cid =t2.id
left join
(select * from lg_dim.express_company_dot_addr where dt ='2020-06-02') t3
on t1.pw_dot_id =t3.id 
group by
t2.type,
t3.dot_name,
t1.order_channel_id,
t1.order_terminal_type;

   指标统计sql
   --最大快递单数
   select sum(express_count) as customer_type_max_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.customer_type order by customer_type_max_count limit 1;

   --各渠道最大快递单数
   select sum(express_count) as channel_max_count ,t.channel_id as channel_id from
(select * from lg_dws.express_base_agg where dt='2020-06-02') t group by
t.channel_id order by channel_max_count desc limit 1;

   --各终端最大快递单数
   select sum(express_count) as terminal_max_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.terminal_type order by terminal_max_count desc limit 1;

   --各网点最大快递单数
   select sum(express_count) as dot_max_count ,t.dot_name as dot_name from (select
* from lg_dws.express_base_agg where dt='2020-06-02') t group by t.dot_name
order by dot_max_count desc limit 1;

   --最小快递单数
   --各类客户最小快递单数 
   select sum(express_count) as customer_type_min_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.customer_type order by customer_type_min_count asc limit 1;

   --各渠道最小快递单数
   select sum(express_count) as channel_min_count ,t.channel_id as channel_id from
(select * from lg_dws.express_base_agg where dt='2020-06-02') t group by
t.channel_id order by channel_min_count asc limit 1;

   --各终端最小快递单数
   select sum(express_count) as terminal_min_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.terminal_type order by terminal_min_count asc limit 1;

   --各网点最小快递单数
   select sum(express_count) as dot_min_count ,t.dot_name as dot_name from (select
* from lg_dws.express_base_agg where dt='2020-06-02') t group by t.dot_name
order by dot_min_count asc limit 1;

   --创建lg_ads层表
drop table if exists lg_ads.express_metrics;
create table lg_ads.express_metrics(
customer_type_max_count bigint,
customer_max_type string,
channel_max_count bigint,
channel_max_id string,
terminal_max_count bigint,
terminal_max_type string,
dot_max_count bigint,
dot_max_name string,
customer_type_min_count bigint,
customer_min_type string,
channel_min_count bigint,
channel_min_id string,
terminal_min_count bigint,
terminal_min_type string,
dot_min_count bigint,
dot_min_name string,
dt string
) COMMENT '快递单主题-指标表' row format delimited fields terminated by ',' STORED
AS textfile;
   
   --汇总sql
insert into table lg_ads.express_metrics
select
t1.customer_type_max_count,
t1.customer_type as customer_max_type,
t2.channel_max_count,
t2.channel_id as channel_max_id,
t3.terminal_max_count,
t3.terminal_type as terminal_max_type,
t4.dot_max_count,
t4.dot_name as dot_max_name,
t5.customer_type_min_count,
t5.customer_type as customer_min_type ,
t6.channel_min_count,
t6.channel_id as channel_min_id,
t7.terminal_min_count,
t7.terminal_type as terminal_min_type,
t8.dot_min_count,
t8.dot_name as dot_min_name,
'2020-06-02'
from
(select sum(express_count) as customer_type_max_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.customer_type order by customer_type_max_count desc limit 1) t1
join
(select sum(express_count) as channel_max_count ,t.channel_id as channel_id
from (select * from lg_dws.express_base_agg where dt='2020-06-02') t group by
t.channel_id order by channel_max_count desc limit 1) t2 
join
(select sum(express_count) as terminal_max_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.terminal_type order by terminal_max_count desc limit 1) t3 
join
(select sum(express_count) as dot_max_count ,t.dot_name as dot_name from
(select * from lg_dws.express_base_agg where dt='2020-06-02') t group by
t.dot_name order by dot_max_count desc limit 1) t4 
join
(select sum(express_count) as customer_type_min_count ,t.customer_type as
customer_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.customer_type order by customer_type_min_count asc limit 1) t5 
join
(select sum(express_count) as channel_min_count ,t.channel_id as channel_id from
(select * from lg_dws.express_base_agg where dt='2020-06-02') t group by
t.channel_id order by channel_min_count asc limit 1) t6 
join
(select sum(express_count) as terminal_min_count ,t.terminal_type as
terminal_type from (select * from lg_dws.express_base_agg where dt='2020-06-02')
t group by t.terminal_type order by terminal_min_count asc limit 1) t7 
join
(select sum(express_count) as dot_min_count ,t.dot_name as dot_name from (select
* from lg_dws.express_base_agg where dt='2020-06-02') t group by t.dot_name
order by dot_min_count asc limit 1) t8;

   导出指标数据
   创建Mysql指标表
   CREATE TABLE `express_agg_metrics` (
`customer_type_max_count` bigint(20) DEFAULT NULL,
`customer_max_type` varchar(10) DEFAULT NULL,
`channel_max_count` bigint(20) DEFAULT NULL,
`channel_max_id` varchar(10) DEFAULT NULL,
`terminal_max_count` bigint(20) DEFAULT NULL,
`terminal_max_type` varchar(10) DEFAULT NULL,
`dot_max_count` bigint(20) DEFAULT NULL,
`dot_max_name` varchar(25) DEFAULT NULL,
`customer_type_min_count` bigint(20) DEFAULT NULL,
`customer_min_type` varchar(10) DEFAULT NULL,
`channel_min_count` bigint(20) DEFAULT NULL,
`channel_min_id` varchar(10) DEFAULT NULL,
`terminal_min_count` bigint(20) DEFAULT NULL,
`terminal_min_type` varchar(10) DEFAULT NULL,
`dot_min_count` bigint(20) DEFAULT NULL,
`dot_min_name` varchar(25) DEFAULT NULL,
`dt` varchar(25) NOT NULL,
PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

   使用Sqoop 工具导出
   cd lg_logstic/export_data/export_express.sh
#!/bin/bash
source /etc/profile
#定义sqoop命令位置，Hive命令位置，在hadoop2
#sqoop=/opt/cloudera/parcels/CDH/bin/sqoop
#Hive=/opt/cloudera/parcels/CDH/bin/hive
#定义工作日期

#编写导入数据通用方法 接收两个参数：第一个：表名，第二个：查询语句
export_data(){
sqoop export \
--connect "jdbc:mysql://linux123:3306/tbl_metrics?
useUnicode=true&characterEncoding=utf-8" \
--username root \
--password 12345678 \
--table $1 \
--export-dir /user/hive/warehouse/lg_ads.db/$2/ \
--num-mappers 1 \
--input-fields-terminated-by ',' \
--update-mode allowinsert \
--update-key dt
}

# 导出快递单指标数据
export_lg_express_metrics(){
export_data express_agg_metrics express_metrics
}

#导出数据方法
export_lg_express_metrics